Skip to content

Feauture/kernel implementation#284

Open
Edwardvaneechoud wants to merge 44 commits intomainfrom
feauture/kernel-implementation
Open

Feauture/kernel implementation#284
Edwardvaneechoud wants to merge 44 commits intomainfrom
feauture/kernel-implementation

Conversation

@Edwardvaneechoud
Copy link
Owner

No description provided.

* Add Docker-based kernel system for isolated Python code execution

Introduces two components:
- kernel_runtime/: Standalone FastAPI service that runs inside Docker
  containers, providing code execution with artifact storage and
  parquet-based data I/O via the flowfile client API
- flowfile_core/kernel/: Orchestration layer that manages kernel
  containers (create, start, stop, delete, execute) using docker-py,
  with full REST API routes integrated into the core backend

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* Add python_script node type for kernel-based code execution

- PythonScriptInput/NodePythonScript schemas in input_schema.py
- add_python_script method in flow_graph.py that stages input
  parquet to shared volume, executes on kernel, reads output back
- get_kernel_manager singleton in kernel/__init__.py
- python_script node template registered in node_store

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* Add integration tests for Docker-based kernel system

- kernel_fixtures.py: builds the flowfile-kernel Docker image, creates
  a KernelManager with a temp shared volume, starts a container, and
  tears everything down via a managed_kernel() context manager
- conftest.py: adds session-scoped kernel_manager fixture
- test_kernel_integration.py: full integration tests covering:
  - TestKernelRuntime: health check, stdout/stderr capture, syntax
    errors, artifact publish/list, parquet read/write round-trip,
    multiple named inputs, execution timing
  - TestPythonScriptNode: python_script node passthrough and transform
    via FlowGraph.run_graph(), plus missing kernel_id error handling
- manager.py: expose shared_volume_path as public property
- flow_graph.py: use public property instead of private attribute

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* update poetry version

* Fix kernel system: singleton routing, state machine, sync execution, port resilience

- routes.py: use get_kernel_manager() singleton instead of creating a
  separate KernelManager instance (was causing dual-state bug)
- models.py: replace RUNNING with IDLE/EXECUTING states; store
  memory_gb, cpu_cores, gpu on KernelInfo from KernelConfig
- manager.py: add _reclaim_running_containers() on init to discover
  existing flowfile-kernel-* containers and reclaim their ports;
  port allocation now scans for available ports instead of incrementing;
  add execute_sync() using httpx.Client for clean sync usage;
  state transitions: IDLE -> EXECUTING -> IDLE during execute()
- flow_graph.py: use execute_sync() instead of fragile
  asyncio.run/ThreadPoolExecutor dance
- test: update state assertion from "running" to "idle"

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* Fix kernel health check and test fixture resilience

- _wait_for_healthy: catch all httpx errors (including
  RemoteProtocolError) during startup polling, not just
  ConnectError/ReadError/ConnectTimeout
- conftest kernel_manager fixture: wrap managed_kernel() in
  try/except so container start failures produce pytest.skip
  instead of ERROR

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* removing breakpoint

* Run kernel integration tests in parallel CI worker

- Add pytest.mark.kernel marker to test_kernel_integration.py
- Register 'kernel' marker in pyproject.toml
- Exclude kernel tests from main backend-tests with -m "not kernel"
  (both Linux and Windows jobs)
- Add dedicated kernel-tests job that runs in parallel:
  builds Docker image, runs -m kernel tests, 15min timeout
- Add kernel_runtime paths to change detection filters
- Include kernel-tests in test-summary aggregation

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* Remove --timeout flag from kernel CI step (pytest-timeout not installed)

The job-level timeout-minutes: 15 already handles this.

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* Add unit tests for kernel_runtime (artifact_store, flowfile_client, endpoints)

42 tests covering the three kernel_runtime modules:
- artifact_store: publish/get/list/clear, metadata, thread safety
- flowfile_client: context management, parquet I/O, artifacts
- main.py endpoints: /health, /execute, /artifacts, /clear, parquet round-trips

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* Add *.egg-info/ to .gitignore

https://claude.ai/code/session_013cuypEib6mAn3uGJNmqnfk

* Add kernel_runtime unit tests to CI kernel-tests job

Installs kernel_runtime with test deps and runs its 42 unit tests
before the Docker-dependent integration tests.
* Add kernel management UI for Python execution environments

Provides a visual interface for managing Docker-based kernel containers
used by Python Script nodes. Users can create kernels with custom packages
and resource limits, monitor status (stopped/starting/idle/executing/error),
and control lifecycle (start/stop/delete) with auto-polling for live updates.

https://claude.ai/code/session_01VvVDasZtEyGEa6TjE3Xjxm

* Update package-lock.json version to match package.json

https://claude.ai/code/session_01VvVDasZtEyGEa6TjE3Xjxm

* Handle Docker unavailable gracefully with 503 and error banner

The kernel routes now catch DockerException during manager init and
return a 503 with a clear message instead of crashing with a 500.
The frontend surfaces this as a red error banner at the top of the
Kernel Manager page so users know Docker needs to be running.

https://claude.ai/code/session_01VvVDasZtEyGEa6TjE3Xjxm

* Add /kernels/docker-status endpoint and proactive UI feedback

New GET /kernels/docker-status endpoint checks Docker daemon reachability
and whether the flowfile-kernel image exists. The UI calls this on page
load and shows targeted banners: red for Docker not running, yellow for
missing kernel image, so users know exactly what to fix before creating
kernels.

https://claude.ai/code/session_01VvVDasZtEyGEa6TjE3Xjxm

* Center kernel manager page with margin auto and padding

Match the layout pattern used by other views (SecretsView, DatabaseView)
with max-width 1200px, margin 0 auto, and standard spacing-5 padding.

https://claude.ai/code/session_01VvVDasZtEyGEa6TjE3Xjxm
* Add ArtifactContext for tracking artifact metadata across FlowGraph

Introduces an ArtifactContext class that tracks which Python artifacts
are published and consumed by python_script nodes, enabling visibility
into artifact availability based on graph topology and kernel isolation.

- Create artifacts.py with ArtifactRef, NodeArtifactState, ArtifactContext
- Integrate ArtifactContext into FlowGraph.__init__
- Add _get_upstream_node_ids and _get_required_kernel_ids helpers
- Clear artifact context at flow start in run_graph()
- Compute available artifacts before and record published after execution
- Add clear_artifacts_sync to KernelManager for non-async clearing
- Add 32 unit tests for ArtifactContext (test_artifact_context.py)
- Add 7 FlowGraph integration tests (test_flowfile.py)
- Add 5 kernel integration tests (test_kernel_integration.py)

https://claude.ai/code/session_01Pz85VwuSqBzovhjEEtweYN

* Add delete_artifact support, duplicate publish prevention, and model training integration test

- ArtifactStore.publish() now raises ValueError if artifact name already exists
- Added ArtifactStore.delete() and flowfile_client.delete_artifact()
- ExecuteResult/ExecuteResponse track artifacts_deleted alongside artifacts_published
- ArtifactContext.record_deleted() removes artifacts from kernel index and published lists
- flow_graph.add_python_script records deletions from execution results
- Integration test: train numpy linear regression in node A, apply predictions in node B
- Integration test: publish -> use & delete -> republish -> access flow
- Integration test: duplicate publish without delete raises error
- Unit tests for all new functionality across kernel_runtime and flowfile_core

https://claude.ai/code/session_01Pz85VwuSqBzovhjEEtweYN

* Support N inputs per name in kernel execution with read_first convenience method

- Change input_paths from dict[str, str] to dict[str, list[str]] across
  ExecuteRequest models (kernel_runtime and flowfile_core)
- read_input() now scans all paths for a name and concatenates them (union),
  supporting N upstream inputs under the same key (e.g. "main")
- Add read_first() convenience method that reads only input_paths[name][0]
- read_inputs() updated to handle list-based paths
- add_python_script now accepts *flowfile_tables (varargs) and writes each
  input to main_0.parquet, main_1.parquet, etc.
- All existing tests updated to use list-based input_paths format
- New tests: multi-main union, read_first, read_inputs with N paths

https://claude.ai/code/session_01Pz85VwuSqBzovhjEEtweYN

* adding multiple paths

* Fix O(N) deletion, deprecated asyncio, naive datetimes, broad exceptions, global context, and hardcoded timeout

- ArtifactContext: add _publisher_index reverse map (kernel_id, name) → node_ids
  so record_deleted and clear_kernel avoid scanning all node states
- Replace asyncio.get_event_loop() with asyncio.get_running_loop() in
  _wait_for_healthy (deprecated since Python 3.10)
- Use datetime.now(timezone.utc) in artifacts.py and models.py instead of
  naive datetime.now()
- Narrow except Exception to specific types: docker.errors.DockerException,
  httpx.HTTPError, OSError, TimeoutError in manager.py
- Add debug logging for health poll failures instead of silent pass
- Replace global _context dict with contextvars.ContextVar in flowfile_client
  for safe concurrent request handling
- Make health timeout configurable via KernelConfig.health_timeout and
  KernelInfo.health_timeout (default 120s), wired through create/start_kernel

https://claude.ai/code/session_01Pz85VwuSqBzovhjEEtweYN

* fix binding to input_id

* remove breakpoint

* Preserve artifact state for cached nodes and add multi-input integration tests

Snapshot artifact context before clear_all() in run_graph() and restore
state for nodes that were cached/skipped (their _func never re-executed so
record_published was never called). Also adds two integration tests
exercising multi-input python_script nodes: one using read_input() for
union and one using read_first() for single-input access.

https://claude.ai/code/session_01Pz85VwuSqBzovhjEEtweYN

* Allow python_script node to accept multiple main inputs

Change the python_script NodeTemplate input from 1 to 10, matching
polars_code and union nodes. With input=1, add_node_connection always
replaced main_inputs instead of appending, so only the last connection
was retained.

https://claude.ai/code/session_01Pz85VwuSqBzovhjEEtweYN

* adding fix

* Scope artifact restore to graph nodes only

The snapshot/restore logic was restoring artifact state for node IDs
that were not part of the graph (e.g. manually injected via
record_published).
@netlify
Copy link

netlify bot commented Jan 31, 2026

Deploy Preview for flowfile-wasm canceled.

Name Link
🔨 Latest commit d11da86
🔍 Latest deploy log https://app.netlify.com/projects/flowfile-wasm/deploys/698f27f6c2f30a0008f98134

* Add PythonScript node drawer with kernel selection, code editor, and artifacts panel

Implements the frontend drawer UI for the python_script node type:
- Kernel selection dropdown with state indicators and warnings
- CodeMirror editor with Python syntax highlighting and flowfile API autocompletions
- Artifacts panel showing available/published artifacts from kernel
- Help modal documenting the flowfile.* API with examples
- TypeScript types for PythonScriptInput and NodePythonScript
- KernelApi.getArtifacts() method for fetching kernel artifact metadata

https://claude.ai/code/session_017DkcGkambwWqtNQKqMBetg

* Fix published artifacts matching by using correct field name from kernel API

The kernel's /artifacts endpoint returns `node_id` (not `source_node_id`)
to identify which node published each artifact. Updated the frontend to
read the correct field so published artifacts display properly.

https://claude.ai/code/session_017DkcGkambwWqtNQKqMBetg

* add translator

* Split artifacts into available (other nodes) vs published (this node)

Available artifacts should only show artifacts from upstream nodes, not
the current node's own publications. Filter by node_id !== currentNodeId
for available, and node_id === currentNodeId for published.
* Persist kernel configurations in database and clean up on shutdown

Kernels are now stored in a `kernels` table (tied to user_id) so they
survive core process restarts.  On startup the KernelManager restores
persisted configs from the DB, then reclaims any running Docker
containers that match; orphan containers with no DB record are stopped.

All kernel REST routes now require authentication and enforce per-user
ownership (list returns only the caller's kernels, mutations check
ownership before proceeding).

On core shutdown (lifespan handler, SIGTERM, SIGINT) every running
kernel container is stopped and removed via `shutdown_all()`.

https://claude.ai/code/session_01PcxZsx9KTQvHLDvzgAUjzC

* Check Docker image availability before starting a kernel

start_kernel now explicitly checks for the flowfile-kernel image
before attempting to run a container, giving a clear error message
("Docker image 'flowfile-kernel' not found. Please build or pull...")
instead of a raw Docker API exception.

https://claude.ai/code/session_01PcxZsx9KTQvHLDvzgAUjzC

* Allocate port lazily in start_kernel for DB-restored kernels

Kernels restored from the database have port=None since ports are
ephemeral and not persisted. start_kernel now calls _allocate_port()
when kernel.port is None, fixing the "Invalid port: 'None'" error
that occurred when starting a kernel after a core restart.

https://claude.ai/code/session_01PcxZsx9KTQvHLDvzgAUjzC
* Add flowfile.log() method for real-time log streaming from kernel to frontend

Enable Python script nodes to stream log messages to the FlowFile log
viewer in real time via flowfile.log(). The kernel container makes HTTP
callbacks to the core's /raw_logs endpoint, which writes to the
FlowLogger file. The existing SSE streamer picks up new lines and
pushes them to the frontend immediately.

Changes:
- Add log(), log_info(), log_warning(), log_error() to flowfile_client
- Pass flow_id and log_callback_url through ExecuteRequest to kernel
- Add host.docker.internal mapping to kernel Docker containers
- Update RawLogInput schema to support node_id and WARNING level
- Forward captured stdout/stderr to FlowLogger after execution

https://claude.ai/code/session_01Svv5uYus8tnHofhH667KKB

* Add kernel runtime versioning visible in frontend

Add __version__ to the kernel_runtime package (0.2.0) and expose it
through the /health endpoint. The KernelManager reads the version when
the container becomes healthy and stores it on KernelInfo. The frontend
KernelCard displays the version badge next to the kernel ID so users
can verify which image version a running kernel is using.
…291)

* Fix artifact loss in debug mode by implementing selective clearing

Previously, run_graph() cleared ALL artifacts from both the metadata
tracker and kernel memory before every run.  When a node was skipped
(up-to-date), the metadata was restored from a snapshot but the actual
Python objects in kernel memory were already gone.  Downstream nodes
that depended on those artifacts would fail with KeyError.

The fix introduces artifact ownership tracking so that only artifacts
from nodes that will actually re-execute are cleared:

- ArtifactStore: add clear_by_node_ids() and list_by_node_id()
- Kernel runtime: add POST /clear_node_artifacts and GET /artifacts/node/{id}
- KernelManager: add clear_node_artifacts_sync() and get_node_artifacts()
- ArtifactContext: add clear_nodes() for selective metadata clearing
- Kernel routes: add /clear_node_artifacts and /artifacts/node/{id} endpoints
- flow_graph.run_graph(): compute execution plan first, determine which
  python_script nodes will re-run, and only clear those nodes' artifacts.
  Skipped nodes keep their artifacts in both metadata and kernel memory.

* Add integration tests for debug mode artifact persistence

Tests verify that artifacts survive re-runs when producing nodes are
skipped (up-to-date) and only consuming nodes re-execute, covering the
core bug scenario, multiple artifacts, and producer re-run clearing.

* Auto-clear node's own artifacts before re-execution in /execute

When a node re-executes (e.g., forced refresh, performance mode re-run),
its previously published artifacts are now automatically cleared before
the new code runs. This prevents "Artifact already exists" errors without
requiring manual delete_artifact() calls in user code.

The clearing is scoped to the executing node's own artifacts only —
artifacts from other nodes are untouched.

* Scope artifacts by flow_id so multiple flows sharing a kernel are isolated

The artifact store now keys artifacts by (flow_id, name) instead of just
name. Two flows using the same kernel can each publish an artifact called
"model" without colliding. All artifact operations (publish, read, delete,
list, clear) are flow-scoped transparently via the execution context.
Edwardvaneechoud and others added 12 commits February 2, 2026 18:14
When a python_script node deletes an artifact (via delete_artifact) and
is later re-executed (e.g. after a code change), the upstream producer
node was not being re-run. This meant the deleted artifact was
permanently lost from the kernel's in-memory store, causing a KeyError
on the consumer's read_artifact call.

The fix tracks which node originally published each deleted artifact
(_deletion_origins in ArtifactContext). During the pre-execution phase
in run_graph, if a re-running node previously deleted artifacts, the
original producer nodes are added to the re-run set and their execution
state is marked stale so they actually re-execute and republish.
* Implement service layer for Flow Catalog system

Extract business logic from route handlers into a proper layered
architecture:

- catalog/exceptions.py: Domain-specific exceptions (CatalogError
  hierarchy) replacing inline HTTPException raises in service code
- catalog/repository.py: CatalogRepository Protocol + SQLAlchemy
  implementation abstracting all data access
- catalog/service.py: CatalogService class owning all business logic
  (validation, enrichment, authorization checks)
- catalog/__init__.py: Public package interface

Refactor routes/catalog.py into a thin HTTP adapter that injects
CatalogService via FastAPI Depends, delegates to service methods,
and translates domain exceptions to HTTP responses.

All 33 existing catalog API tests pass with no behavior changes.

https://claude.ai/code/session_017KkbxgQFxELX8fhk3cDYGF

* Address performance and observability concerns

1. Fix N+1 queries in flow listing (4×N → 3 queries):
   - Add bulk_get_favorite_flow_ids, bulk_get_follow_flow_ids,
     bulk_get_run_stats to CatalogRepository
   - Add _bulk_enrich_flows to CatalogService
   - Update list_flows, get_namespace_tree, list_favorites,
     list_following, get_catalog_stats to use bulk enrichment

2. Add tech debt comment for ArtifactStore memory pattern:
   - Document the in-memory storage limitation for large artifacts
   - Suggest future improvements (spill-to-disk, external store)

3. Promote _auto_register_flow logging from debug to info:
   - Users can now see why flows don't appear in catalog
   - Log success and specific failure reasons

4. Improve _run_and_track error handling:
   - Use ERROR level for DB persistence failures
   - Add tracking_succeeded flag with explicit failure message
   - Log successful tracking with run details
   - Add context about flow status in error messages
* Auto-restart stopped/errored kernels on execution instead of raising

When a kernel is in STOPPED or ERROR state and an operation (execute,
clear_artifacts, etc.) is attempted, the KernelManager now automatically
restarts the kernel container instead of raising a RuntimeError. This
handles the common case where a kernel was restored from the database
after a server restart but its container is no longer running.

Changes:
- Add start_kernel_sync() and _wait_for_healthy_sync() for sync callers
- Add _ensure_running() / _ensure_running_sync() helpers that restart
  STOPPED/ERROR kernels and wait for STARTING kernels
- Replace RuntimeError raises in execute, execute_sync, clear_artifacts,
  clear_node_artifacts, and get_node_artifacts with auto-restart calls

https://claude.ai/code/session_01LNkAY67dhbnBpy5sHGXeZL

* adding stream logs

* adding flow logger

* Pass flow_logger through _ensure_running_sync to start_kernel_sync

- _ensure_running_sync now logs restart attempt to flow_logger
- Passes flow_logger to start_kernel_sync so users see kernel
  restart progress in the flow execution log
- Fix bug: error handler was calling logger.error instead of
  flow_logger.error

https://claude.ai/code/session_01LNkAY67dhbnBpy5sHGXeZL

* Pass flow_logger to clear_node_artifacts_sync in flow_graph

https://claude.ai/code/session_01LNkAY67dhbnBpy5sHGXeZL

* Add tests for kernel auto-restart on stopped/errored state

New TestKernelAutoRestart class with 4 tests:
- test_execute_sync_restarts_stopped_kernel
- test_execute_async_restarts_stopped_kernel
- test_clear_node_artifacts_restarts_stopped_kernel
- test_python_script_node_with_stopped_kernel

Each test stops the kernel, then verifies that the operation
auto-restarts it instead of raising RuntimeError.
Document how to build and run the Docker image, API endpoints,
the flowfile module usage for data I/O and artifact management,
and development setup instructions.
Add fsync calls after writing parquet files to ensure they are fully
flushed to disk before being read. This prevents "File must end with
PAR1" errors that occur when the kernel reads input files or the host
reads output files before the PAR1 footer is fully written.

The issue occurs because write_parquet() may leave data in OS buffers,
and when sharing files between host and Docker container via mounted
volumes, the reader can see an incomplete file.
* Add display output support for rich notebook-like rendering

Backend changes:
- Add flowfile.display() function to flowfile_client.py that supports
  matplotlib figures, plotly figures, PIL images, HTML strings, and plain text
- Add DisplayOutput model to ExecuteResponse with mime_type, data, and title
- Patch matplotlib.pyplot.show() to auto-capture figures as display outputs
- Add _maybe_wrap_last_expression() for interactive mode auto-display
- Add interactive flag to ExecuteRequest for cell execution mode
- Add /execute_cell endpoint that enables interactive mode

Frontend changes:
- Add DisplayOutput and ExecuteResult interfaces to kernel.types.ts
- Add executeCell() method to KernelApi class
- Add display() completion to flowfileCompletions.ts

Tests:
- Add comprehensive tests for display(), _reset_displays(), _get_displays()
- Add tests for display output in execute endpoint
- Add tests for interactive mode auto-display behavior

https://claude.ai/code/session_01Qsbq4DBFmEPRMWHio7iCDc

* Fix review issues in display output support

- Add _displays.set([]) to _clear_context() for consistent cleanup
- Fix _is_html_string false-positives by using regex to detect actual HTML tags
- Export DisplayOutput from flowfile_core/kernel/__init__.py
- Change base64 decode to use ascii instead of utf-8
- Simplify _maybe_wrap_last_expression using ast.get_source_segment
- Add tests for HTML false-positive cases (e.g., "x < 10 and y > 5")
- Reduce spacing: settings gap (0.65rem), block gap (0.3rem), label font-size (0.8rem)
- Make kernel selector compact with size="small" and smaller warning box
- Improve editor container with subtle inner shadow and tighter border-radius
- Make artifacts panel lighter with transparent bg and top border only
- Move help button inline with Code label to save vertical space
* Fix read_inputs() to return list of LazyFrames for multiple inputs

Previously, when multiple nodes were connected to a python_script node,
read_inputs() would concatenate all inputs into a single LazyFrame,
making it impossible to distinguish between different input sources.

Now read_inputs() returns dict[str, list[pl.LazyFrame]] where each
entry is a list of LazyFrames, one per connected input.

* Update help and autocomplete docs for read_inputs() return type

- FlowfileApiHelp.vue: Updated description and example to show
  that read_inputs() returns dict of LazyFrame lists
- flowfileCompletions.ts: Updated info and detail to reflect
  the new return type signature

* Fix tests for read_inputs() returning list of LazyFrames

Updated tests to expect dict[str, list[LazyFrame]] return type:
- test_read_inputs_returns_dict: check for list with LazyFrame element
- test_multiple_named_inputs: access inputs via [0] index
- test_read_inputs_with_multiple_main_paths: verify list length and values
- test_multiple_inputs: access inputs via [0] index in code string

* Fix test_multiple_inputs in test_kernel_integration.py

Updated code string to access read_inputs() results via [0] index
since it now returns dict[str, list[LazyFrame]].
…316)

When clicking a python_script node before running the flow, the editor
took ~17s because get_predicted_schema fell through to execution,
triggering upstream pipeline runs and kernel container startup. Add a
schema_callback that returns the input node schema as a best-effort
prediction, matching the pattern used by other node types like output.
* Link global artifacts to registered catalog flows

Every global artifact now requires a source_registration_id that ties it
to a registered catalog flow. The artifact inherits the flow's
namespace_id by default, with the option to override explicitly.

Key changes:
- Add source_registration_id FK column to GlobalArtifact model
- Make source_registration_id required in PrepareUploadRequest schema
- Validate registration exists and inherit namespace_id in artifact service
- Block flow deletion when active (non-deleted) artifacts reference it
- Add FlowHasArtifactsError exception for cascade protection
- Pass source_registration_id through kernel execution context
- Hard-delete soft-deleted artifacts when their flow is deleted
- Add comprehensive tests for all new behaviors (38 tests pass)

https://claude.ai/code/session_01C4qUHycARzswGKpAtEjRjU

* Fix kernel_runtime publish_global tests: add source_registration_id context

The publish_global function now requires source_registration_id in the
execution context. Add autouse fixtures to TestPublishGlobal and
TestGlobalArtifactIntegration that set up the flowfile context with
source_registration_id before each test.

https://claude.ai/code/session_01C4qUHycARzswGKpAtEjRjU

* Fix kernel integration tests: pass source_registration_id in ExecuteRequest

Add test_registration fixture that creates a FlowRegistration in the DB,
and pass its ID through ExecuteRequest and _create_graph for all tests
that call publish_global. This satisfies the required source_registration_id
validation in both the kernel context and Core API.
The PythonScript node's artifact panel was showing all artifacts from the
kernel regardless of DAG structure. When two independent chains shared a
kernel, artifacts from one chain would incorrectly appear as "available"
in the other chain.

The fix adds a backend endpoint that exposes upstream node IDs via DAG
traversal, and updates the frontend to filter kernel artifacts using
this DAG-aware set instead of showing all non-self artifacts.

- Backend: Add GET /flow/node_upstream_ids endpoint
- Frontend: FlowApi.getNodeUpstreamIds() fetches upstream IDs
- Frontend: PythonScript.vue filters artifacts by upstream set
- Tests: Add chain isolation tests for ArtifactContext
- Tests: Add endpoint test for node_upstream_ids
)

# Discover parquet files in the input directory
if os.path.isdir(input_dir):

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.

Copilot Autofix

AI 3 days ago

In general, to fix uncontrolled data in path expressions, you must ensure that any path derived from untrusted data is normalized and then verified to be contained within a designated safe root directory, rejecting or sanitizing inputs that attempt to escape this root (for example using .. or absolute paths).

In this specific case, the best fix with minimal behavior change is to normalize and validate the constructed input_dir and output_dir paths against self._shared_volume. We will:

  • Use os.path.abspath (or os.path.realpath) combined with os.path.join to build input_dir and output_dir from self._shared_volume, flow_id, and node_id.
  • After normalization, verify that both input_dir and output_dir are within self._shared_volume by checking their common path prefix with os.path.commonpath.
  • If either path falls outside self._shared_volume, raise a RuntimeError to be turned into a 400 response by the existing exception handler in routes.py.
  • Optionally, coerce flow_id and node_id to strings and rely on this normalization/containment check, without changing their semantics.

All necessary imports (os) already exist at the top of flowfile_core/flowfile_core/kernel/manager.py, so no new imports are required. The changes are localized to the resolve_node_paths method in manager.py.

Concretely, in flowfile_core/flowfile_core/kernel/manager.py around lines 192–203, we will:

  1. Compute base_dir = os.path.abspath(self._shared_volume).
  2. Build input_dir and output_dir with os.path.abspath(os.path.join(...)).
  3. Validate both using os.path.commonpath([base_dir, input_dir]) == base_dir (and similarly for output_dir).
  4. Raise a RuntimeError if validation fails.

This preserves existing functionality for legitimate flow/node IDs while preventing path traversal outside the shared volume.

Suggested changeset 1
flowfile_core/flowfile_core/kernel/manager.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/flowfile_core/flowfile_core/kernel/manager.py b/flowfile_core/flowfile_core/kernel/manager.py
--- a/flowfile_core/flowfile_core/kernel/manager.py
+++ b/flowfile_core/flowfile_core/kernel/manager.py
@@ -189,19 +189,31 @@
         if request.input_paths or not request.flow_id or not request.node_id:
             return
 
-        input_dir = os.path.join(
-            self._shared_volume,
-            str(request.flow_id),
-            str(request.node_id),
-            "inputs",
+        base_dir = os.path.abspath(self._shared_volume)
+        input_dir = os.path.abspath(
+            os.path.join(
+                base_dir,
+                str(request.flow_id),
+                str(request.node_id),
+                "inputs",
+            )
         )
-        output_dir = os.path.join(
-            self._shared_volume,
-            str(request.flow_id),
-            str(request.node_id),
-            "outputs",
+        output_dir = os.path.abspath(
+            os.path.join(
+                base_dir,
+                str(request.flow_id),
+                str(request.node_id),
+                "outputs",
+            )
         )
 
+        # Ensure the resolved paths stay within the shared volume
+        if (
+            os.path.commonpath([base_dir, input_dir]) != base_dir
+            or os.path.commonpath([base_dir, output_dir]) != base_dir
+        ):
+            raise RuntimeError("Invalid flow/node identifiers for path resolution")
+
         # Discover parquet files in the input directory
         if os.path.isdir(input_dir):
             parquet_files = sorted(
EOF
@@ -189,19 +189,31 @@
if request.input_paths or not request.flow_id or not request.node_id:
return

input_dir = os.path.join(
self._shared_volume,
str(request.flow_id),
str(request.node_id),
"inputs",
base_dir = os.path.abspath(self._shared_volume)
input_dir = os.path.abspath(
os.path.join(
base_dir,
str(request.flow_id),
str(request.node_id),
"inputs",
)
)
output_dir = os.path.join(
self._shared_volume,
str(request.flow_id),
str(request.node_id),
"outputs",
output_dir = os.path.abspath(
os.path.join(
base_dir,
str(request.flow_id),
str(request.node_id),
"outputs",
)
)

# Ensure the resolved paths stay within the shared volume
if (
os.path.commonpath([base_dir, input_dir]) != base_dir
or os.path.commonpath([base_dir, output_dir]) != base_dir
):
raise RuntimeError("Invalid flow/node identifiers for path resolution")

# Discover parquet files in the input directory
if os.path.isdir(input_dir):
parquet_files = sorted(
Copilot is powered by AI and may make mistakes. Always verify output.
# Discover parquet files in the input directory
if os.path.isdir(input_dir):
parquet_files = sorted(
f for f in os.listdir(input_dir) if f.endswith(".parquet")

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.

Copilot Autofix

AI 3 days ago

General approach: Ensure that any path constructed from user-controlled data is restricted to a known safe root. Since this code already uses a shared root (self._shared_volume), the safest and least invasive fix is to normalize the derived directories and verify that they reside within self._shared_volume before accessing the filesystem. This avoids directory traversal even if flow_id or node_id contain .., slashes, or absolute paths.

Best concrete fix: In KernelManager.resolve_node_paths (the method shown), after computing input_dir and output_dir, call os.path.abspath on each and verify they start with the absolute form of self._shared_volume plus a path separator (or are exactly equal). If either directory falls outside the shared root, raise a RuntimeError so the calling FastAPI handler can return a 400 via the existing except RuntimeError clauses. Then use the normalized safe paths for os.path.isdir, os.listdir, and for constructing request.input_paths and request.output_dir. This preserves current behavior for valid IDs but prevents crafted IDs from escaping the shared volume.

Concretely in flowfile_core/flowfile_core/kernel/manager.py:

  • In resolve_node_paths, after the initial early return, compute base_root = os.path.abspath(self._shared_volume).
  • Build input_dir and output_dir as before, then normalize them via os.path.abspath.
  • Add a small helper inside the method (or just inline logic) that checks that each normalized path starts with base_root + os.sep or equals base_root. If not, raise RuntimeError("Invalid flow_id/node_id path") (or similar).
  • Replace subsequent uses of input_dir and output_dir with the normalized versions for os.path.isdir, os.listdir, and self.to_kernel_path.

No other files require modification; the FastAPI routes already handle RuntimeError from resolve_node_paths.

Suggested changeset 1
flowfile_core/flowfile_core/kernel/manager.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/flowfile_core/flowfile_core/kernel/manager.py b/flowfile_core/flowfile_core/kernel/manager.py
--- a/flowfile_core/flowfile_core/kernel/manager.py
+++ b/flowfile_core/flowfile_core/kernel/manager.py
@@ -189,6 +189,8 @@
         if request.input_paths or not request.flow_id or not request.node_id:
             return
 
+        base_root = os.path.abspath(self._shared_volume)
+
         input_dir = os.path.join(
             self._shared_volume,
             str(request.flow_id),
@@ -202,20 +204,29 @@
             "outputs",
         )
 
+        # Normalize and validate that the derived paths stay within the shared volume
+        safe_input_dir = os.path.abspath(input_dir)
+        safe_output_dir = os.path.abspath(output_dir)
+
+        for candidate in (safe_input_dir, safe_output_dir):
+            # Ensure candidate is under base_root (or equal to it)
+            if not (candidate == base_root or candidate.startswith(base_root + os.sep)):
+                raise RuntimeError("Invalid flow_id/node_id path")
+
         # Discover parquet files in the input directory
-        if os.path.isdir(input_dir):
+        if os.path.isdir(safe_input_dir):
             parquet_files = sorted(
-                f for f in os.listdir(input_dir) if f.endswith(".parquet")
+                f for f in os.listdir(safe_input_dir) if f.endswith(".parquet")
             )
             if parquet_files:
                 request.input_paths = {
                     "main": [
-                        self.to_kernel_path(os.path.join(input_dir, f))
+                        self.to_kernel_path(os.path.join(safe_input_dir, f))
                         for f in parquet_files
                     ]
                 }
 
-        request.output_dir = self.to_kernel_path(output_dir)
+        request.output_dir = self.to_kernel_path(safe_output_dir)
 
     def _build_run_kwargs(self, kernel_id: str, kernel: KernelInfo, env: dict) -> dict:
         """Build Docker ``containers.run()`` keyword arguments.
EOF
@@ -189,6 +189,8 @@
if request.input_paths or not request.flow_id or not request.node_id:
return

base_root = os.path.abspath(self._shared_volume)

input_dir = os.path.join(
self._shared_volume,
str(request.flow_id),
@@ -202,20 +204,29 @@
"outputs",
)

# Normalize and validate that the derived paths stay within the shared volume
safe_input_dir = os.path.abspath(input_dir)
safe_output_dir = os.path.abspath(output_dir)

for candidate in (safe_input_dir, safe_output_dir):
# Ensure candidate is under base_root (or equal to it)
if not (candidate == base_root or candidate.startswith(base_root + os.sep)):
raise RuntimeError("Invalid flow_id/node_id path")

# Discover parquet files in the input directory
if os.path.isdir(input_dir):
if os.path.isdir(safe_input_dir):
parquet_files = sorted(
f for f in os.listdir(input_dir) if f.endswith(".parquet")
f for f in os.listdir(safe_input_dir) if f.endswith(".parquet")
)
if parquet_files:
request.input_paths = {
"main": [
self.to_kernel_path(os.path.join(input_dir, f))
self.to_kernel_path(os.path.join(safe_input_dir, f))
for f in parquet_files
]
}

request.output_dir = self.to_kernel_path(output_dir)
request.output_dir = self.to_kernel_path(safe_output_dir)

def _build_run_kwargs(self, kernel_id: str, kernel: KernelInfo, env: dict) -> dict:
"""Build Docker ``containers.run()`` keyword arguments.
Copilot is powered by AI and may make mistakes. Always verify output.
The add_python_script method was calling collect().write_parquet() directly
on the core process, which is undesirable for performance. This change
offloads the collect and parquet writing to the worker process using the
existing ExternalDfFetcher infrastructure.

Changes:
- Add write_parquet operation to worker funcs.py that deserializes a
  LazyFrame, collects it, and writes to a specified parquet path with fsync
- Add write_parquet to OperationType in both worker and core models
- Add kwargs support to ExternalDfFetcher and trigger_df_operation so
  custom parameters (like output_path) can be passed through both
  WebSocket streaming and REST fallback paths
- Update REST /submit_query/ endpoint to read kwargs from X-Kwargs header
- Replace direct collect().write_parquet() in add_python_script with
  ExternalDfFetcher using the new write_parquet operation type
Scope pytest collection to tests/integration/ to avoid importing
conftest.py from flowfile_core/tests, flowfile_frame/tests, and
flowfile_worker/tests, which fail with ModuleNotFoundError due to
ambiguous 'tests' package names. Also remove a stray breakpoint()
that would hang CI.
* Add kernel memory usage display and OOM detection

Display live container memory usage near the kernel selector in the
Python Script node and in the Kernel Manager cards. The kernel runtime
reads cgroup v1/v2 memory stats, flowfile_core proxies them, and the
frontend polls every 3 seconds with color-coded warnings at 80% (yellow)
and 95% (red). When a container is OOM-killed during execution, the
error is detected via Docker inspect and surfaced as a clear
"Kernel ran out of memory" message instead of a generic connection error.

https://claude.ai/code/session_0183iqhJnXhyfbx4VrqjXscR

* Fix 500 errors on kernel memory stats endpoint

Catch httpx/OS errors in get_memory_stats and convert them to
RuntimeError so the route handler returns a proper 400 instead of
an unhandled 500. This prevents console spam when the kernel runtime
container hasn't been rebuilt with the /memory endpoint yet.
When a flow is restored (undo/redo or loaded from YAML/JSON), the
source_registration_id was not included in the FlowfileSettings
serialization schema. This caused publish_global to fail with
"source_registration_id is required" because the kernel received
None instead of the catalog registration ID.

Changes:
- Add source_registration_id to FlowfileSettings schema
- Include source_registration_id in get_flowfile_data() serialization
- Include source_registration_id in _flowfile_data_to_flow_information()
  deserialization
- Preserve source_registration_id in restore_from_snapshot() so undo/redo
  doesn't lose it even when the snapshot predates the registration
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants